/*---------------------------------------------------------------------------------------------
 *  Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
 *  This file is a part of the ModelEngine Project.
 *  Licensed under the MIT License. See License.txt in the project root for license information.
 *--------------------------------------------------------------------------------------------*/

package modelengine.fit.waterflow.domain.context.repo.flowsession;

import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow;
import modelengine.fit.waterflow.domain.context.FlowSession;
import modelengine.fit.waterflow.domain.context.Window;
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo;
import modelengine.fitframework.inspection.Validation;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Manages data caching during flow execution and handles unified resource release upon session completion.
 * This repository centrally stores session-related data and ensures proper cleanup when sessions finish.
 *
 * @author 宋永坦
 * @since 2025-02-12
 */
public class FlowSessionRepo {
    /**
     * Stores flow session resources for coordinated management and release.
     * The outer map key is flow identifier, inner map key is session unique identifier.
     */
    private static final Map<String, Map<String, FlowSessionCache>> cache = new ConcurrentHashMap<>();

    /**
     * Retrieves the next session for data propagation to downstream nodes.
     *
     * @param flowId The unique identifier of the flow.
     * @param session The current session context.
     * @return The next session for data transmission.
     */
    public static FlowSession getNextToSession(String flowId, FlowSession session) {
        Validation.notNull(flowId, "Flow id cannot be null.");
        Validation.notNull(session, "Session cannot be null.");
        return getFlowSessionCache(flowId, session)
                .getNextToSession(session);
    }

    /**
     * Retrieves the session for handling emitter operations in the next processing step.
     *
     * @param flowId The unique identifier of the flow.
     * @param session The current session context.
     * @return The session configured for emitter handling.
     */
    public static FlowSession getNextEmitterHandleSession(String flowId, FlowSession session) {
        Validation.notNull(flowId, "Flow id cannot be null.");
        Validation.notNull(session, "Session cannot be null.");
        return getFlowSessionCache(flowId, session)
                .getNextEmitterHandleSession(session);
    }

    /**
     * Gets the next accumulation order number for the specified node.
     *
     * @param flowId The unique identifier of the flow.
     * @param nodeId The target node identifier.
     * @param session The current session context.
     * @return The next accumulation sequence number.
     */
    public static int getNextAccOrder(String flowId, String nodeId, FlowSession session) {
        Validation.notNull(flowId, "Flow id cannot be null.");
        Validation.notNull(nodeId, "Node id cannot be null.");
        Validation.notNull(session, "Session cannot be null.");
        return getFlowSessionCache(flowId, session).getNextAccOrder(nodeId);
    }

    /**
     * Retrieves the {@link FlatMapSourceWindow} generated by a flatMap node operation.
     *
     * @param flowId The unique identifier of the flow.
     * @param window The input window entering the flatMap node.
     * @param repo The flow context persistence repository.
     * @return The corresponding {@link FlatMapSourceWindow} instance.
     */
    public static FlatMapSourceWindow getFlatMapSource(String flowId, Window window, FlowContextRepo repo) {
        Validation.notNull(flowId, "Flow id cannot be null.");
        Validation.notNull(window, "Window cannot be null.");
        Validation.notNull(window.getSession(), "Session cannot be null.");
        Validation.notNull(repo, "Repo cannot be null.");
        return getFlowSessionCache(flowId, window.getSession())
                .getFlatMapSourceWindow(window, repo);
    }

    /**
     * Releases all resources associated with a specific flow session.
     *
     * @param flowId The unique identifier of the flow.
     * @param session The target session for resource cleanup.
     */
    public static void release(String flowId, FlowSession session) {
        Validation.notNull(flowId, "Flow id cannot be null.");
        Validation.notNull(session, "Session cannot be null.");
        cache.compute(flowId, (__, value) -> {
            if (value == null) {
                return null;
            }
            value.remove(session.getId());
            if (value.isEmpty()) {
                return null;
            }
            return value;
        });
    }

    private static FlowSessionCache getFlowSessionCache(String flowId, FlowSession session) {
        return cache.compute(flowId, (__, value) -> {
            Map<String, FlowSessionCache> sessionCacheMap = value;
            if (sessionCacheMap == null) {
                sessionCacheMap = new ConcurrentHashMap<>();
            }
            sessionCacheMap.computeIfAbsent(session.getId(), id -> new FlowSessionCache());
            return sessionCacheMap;
        }).get(session.getId());
    }

    private static class FlowSessionCache {
        /**
         * 记录每个节点向下个节点流转数据时，下个节点使用的 session，用于将同一批数据汇聚。
         * 其中索引为当前节点正在处理数据的窗口的唯一标识。
         */
        private final Map<UUID, FlowSession> nextToSessions = new ConcurrentHashMap<>();

        private final Map<UUID, FlowSession> nextEmitterHandleSessions = new ConcurrentHashMap<>();

        /**
         * 记录流程中经过 flatMap 节点产生的窗口信息，用于将同一批数据汇聚。
         * 其中索引为当前节点正在处理数据的窗口的唯一标识。
         */
        private final Map<UUID, FlatMapSourceWindow> flatMapSourceWindows = new ConcurrentHashMap<>();

        private final Map<String, Integer> accOrders = new ConcurrentHashMap<>();

        private FlowSession getNextToSession(FlowSession session) {
            return this.nextToSessions.computeIfAbsent(session.getWindow().key(), __ -> generateNextSession(session));
        }

        private FlowSession getNextEmitterHandleSession(FlowSession session) {
            return this.nextEmitterHandleSessions.computeIfAbsent(session.getWindow().key(), __ -> {
                FlowSession next = FlowSession.newRootSession(session, session.preserved());
                Window nextWindow = next.begin();
                // if the processor is not reduce, then inherit previous window condition
                if (!session.isAccumulator()) {
                    nextWindow.setCondition(session.getWindow().getCondition());
                }
                return next;
            });
        }

        private FlatMapSourceWindow getFlatMapSourceWindow(Window window, FlowContextRepo repo) {
            return this.flatMapSourceWindows.computeIfAbsent(window.key(), __ -> {
                FlatMapSourceWindow newWindow = new FlatMapSourceWindow(window, repo);
                newWindow.setSession(new FlowSession(window.getSession().preserved()));
                newWindow.getSession().setWindow(newWindow);
                newWindow.getSession().begin();
                return newWindow;
            });
        }

        private int getNextAccOrder(String nodeId) {
            return this.accOrders.compute(nodeId, (key, value) -> {
                if (value == null) {
                    return 0;
                }
                return value + 1;
            });
        }

        private static FlowSession generateNextSession(FlowSession session) {
            FlowSession next = new FlowSession(session);
            Window nextWindow = next.begin();
            // if the processor is not reduce, then inherit previous window condition
            if (!session.isAccumulator()) {
                nextWindow.setCondition(session.getWindow().getCondition());
            }
            return next;
        }
    }
}
